{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Iris Flower Classification and Serving Using SkLearn, HopsML, and the Hopsworks Feature Store\n",
    "\n",
    "In this notebook we will, \n",
    "\n",
    "1. Load the Iris Flower dataset from HopsFS\n",
    "2. Do feature engineering on the dataset\n",
    "3. Save the features to the feature store\n",
    "4. Read the feature data from the feature store\n",
    "5. Train a KNN Model using SkLearn\n",
    "6. Save the trained model to HopsFS\n",
    "7. Launch a serving instance to serve the trained model\n",
    "8. Send some prediction requests to the served model\n",
    "9. Monitor the predictions through Kafka\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.neighbors import KNeighborsClassifier\n",
    "import joblib\n",
    "from pyspark.ml.feature import StringIndexer\n",
    "from pyspark.sql.types import IntegerType\n",
    "import numpy as np\n",
    "import time\n",
    "import json\n",
    "from hops import kafka, hdfs, featurestore, serving\n",
    "from confluent_kafka import Producer, Consumer, KafkaError\n",
    "import random"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Load Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "project_path = hdfs.project_path()\n",
    "iris_df = spark.read.format(\"csv\").option(\"header\", \"true\").option(\"inferSchema\", True).load(\n",
    "    project_path + \"TourData/iris/iris.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- sepal_length: double (nullable = true)\n",
      " |-- sepal_width: double (nullable = true)\n",
      " |-- petal_length: double (nullable = true)\n",
      " |-- petal_width: double (nullable = true)\n",
      " |-- variety: string (nullable = true)"
     ]
    }
   ],
   "source": [
    "iris_df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Feature  Engineering\n",
    "\n",
    "The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the `variety` column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- sepal_length: double (nullable = true)\n",
      " |-- sepal_width: double (nullable = true)\n",
      " |-- petal_length: double (nullable = true)\n",
      " |-- petal_width: double (nullable = true)\n",
      " |-- label: integer (nullable = true)"
     ]
    }
   ],
   "source": [
    "encoder = StringIndexer(inputCol=\"variety\", outputCol=\"label\")\n",
    "model = encoder.fit(iris_df)\n",
    "iris_df1 = model.transform(iris_df)\n",
    "lookup_df = iris_df1.select([\"variety\", \"label\"]).distinct()\n",
    "iris_df2 = iris_df1.drop(\"variety\")\n",
    "iris_df3 = iris_df2.withColumn(\"label\", iris_df2[\"label\"].cast(IntegerType()))\n",
    "iris_df3.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-----------+------------+-----------+-----+\n",
      "|sepal_length|sepal_width|petal_length|petal_width|label|\n",
      "+------------+-----------+------------+-----------+-----+\n",
      "|         5.1|        3.5|         1.4|        0.2|    2|\n",
      "|         4.9|        3.0|         1.4|        0.2|    2|\n",
      "|         4.7|        3.2|         1.3|        0.2|    2|\n",
      "|         4.6|        3.1|         1.5|        0.2|    2|\n",
      "|         5.0|        3.6|         1.4|        0.2|    2|\n",
      "+------------+-----------+------------+-----------+-----+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "iris_df3.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-----+\n",
      "|   variety|label|\n",
      "+----------+-----+\n",
      "| Virginica|  0.0|\n",
      "|Versicolor|  1.0|\n",
      "|    Setosa|  2.0|\n",
      "+----------+-----+"
     ]
    }
   ],
   "source": [
    "lookup_df.show(3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Save Features to the Feature Store\n",
    "\n",
    "We can save two feature groups (hive tables), one called `iris_features` that contains the iris features and the corresponding numeric label, and another feature group called `iris_labels_lookup` for converting the numeric iris label back to categorical.\n",
    "\n",
    "**Note**: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the \"Settings\" tab in your project, select the feature store service and click \"Save\". "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : iris_features, version: 1\n",
      "computing feature correlation for: iris_features, version: 1\n",
      "computing feature histograms for: iris_features, version: 1\n",
      "computing cluster analysis for: iris_features, version: 1\n",
      "Running sql: use demo_deep_learning_admin000_featurestore\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(iris_df3, \"iris_features\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : iris_labels_lookup, version: 1\n",
      "Running sql: use demo_deep_learning_admin000_featurestore\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(lookup_df, \"iris_labels_lookup\", feature_correlation=False, \n",
    "                                 feature_histograms=False, cluster_analysis=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Read the Iris Training Dataset from the Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_deep_learning_admin000_featurestore\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM iris_features_1"
     ]
    }
   ],
   "source": [
    "train_df = featurestore.get_featuregroup(\"iris_features\", dataframe_type=\"pandas\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "       sepal_length  sepal_width  petal_length  petal_width       label\n",
      "count    150.000000   150.000000    150.000000   150.000000  150.000000\n",
      "mean       5.843333     3.057333      3.758000     1.199333    1.000000\n",
      "std        0.828066     0.435866      1.765298     0.762238    0.819232\n",
      "min        4.300000     2.000000      1.000000     0.100000    0.000000\n",
      "25%        5.100000     2.800000      1.600000     0.300000    0.000000\n",
      "50%        5.800000     3.000000      4.350000     1.300000    1.000000\n",
      "75%        6.400000     3.300000      5.100000     1.800000    2.000000\n",
      "max        7.900000     4.400000      6.900000     2.500000    2.000000"
     ]
    }
   ],
   "source": [
    "train_df.describe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "x_df = train_df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]\n",
    "y_df = train_df[[\"label\"]]\n",
    "X = x_df.values\n",
    "y = y_df.values.ravel()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Train a KNN Model using the Feature Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "KNeighborsClassifier(algorithm='auto', leaf_size=30, metric='minkowski',\n",
      "                     metric_params=None, n_jobs=None, n_neighbors=5, p=2,\n",
      "                     weights='uniform')"
     ]
    }
   ],
   "source": [
    "iris_knn = KNeighborsClassifier()\n",
    "iris_knn.fit(X, y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Save the Trained Model to HopsFS"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Started copying local path iris_knn.pkl to hdfs path hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Resources/iris_knn.pkl\n",
      "\n",
      "Finished copying"
     ]
    }
   ],
   "source": [
    "joblib.dump(iris_knn, \"iris_knn.pkl\")\n",
    "hdfs.copy_to_hdfs(\"iris_knn.pkl\", \"Resources\", overwrite=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Constants"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "SERVING_NAME = \"IrisFlowerClassifier\"\n",
    "SERVING_VERSION = 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "###  Export the Trained Model to Hopsworks Model Directory\n",
    "\n",
    "It is not required but it is a best-practice to put trained models in the **Models** dataset in Hopsworks, indicating the versions of a model with the directory structure. There is a utility function in the `hops` module for doing this. (You can do it manually also with the `hdfs` module and file operations, or using the Hopsworks UI and drag-and-drop).\n",
    "\n",
    "Below is the code for exporting the model saved in `Resources/iris_knn.pkl` to `Models/irisFlowerClassifier/1/iris_knn.pkl` using the hops module called `serving`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/'"
     ]
    }
   ],
   "source": [
    "model_path = \"Resources/iris_knn.pkl\"\n",
    "serving.export(model_path, SERVING_NAME, SERVING_VERSION, overwrite=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Serve the Trained Model\n",
    "\n",
    "To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:\n",
    "\n",
    "```python\n",
    "from sklearn.externals import joblib\n",
    "from hops import hdfs\n",
    "import os\n",
    "\n",
    "class Predict(object):\n",
    "\n",
    "    def __init__(self):\n",
    "        \"\"\" Initializes the serving state, reads a trained model from HDFS\"\"\"\n",
    "        self.model_path = \"Models/iris_knn.pkl\"\n",
    "        print(\"Copying SKLearn model from HDFS to local directory\")\n",
    "        hdfs.copy_to_local(self.model_path)\n",
    "        print(\"Reading local SkLearn model for serving\")\n",
    "        self.model = joblib.load(\"./iris_knn.pkl\")\n",
    "        print(\"Initialization Complete\")\n",
    "\n",
    "\n",
    "    def predict(self, inputs):\n",
    "        \"\"\" Serves a prediction request usign a trained model\"\"\"\n",
    "        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable\n",
    "\n",
    "    def classify(self, inputs):\n",
    "        \"\"\" Serves a classification request using a trained model\"\"\"\n",
    "        return \"not implemented\"\n",
    "\n",
    "    def regress(self, inputs):\n",
    "        \"\"\" Serves a regression request using a trained model\"\"\"\n",
    "        return \"not implemented\"\n",
    "```\n",
    "\n",
    "Then upload this python script to some folder in your project and go to the \"Model Serving\" service in Hopsworks:\n",
    "\n",
    "![sklearn_serving1.png](./../../images/sklearn_serving1.png)\n",
    "\n",
    "Then click on \"create serving\" and configure your serving:\n",
    "\n",
    "![sklearn_serving2.png](./../../images/sklearn_serving2.png)\n",
    "\n",
    "Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.\n",
    "\n",
    "![sklearn_serving3.png](./../../images/sklearn_serving3.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A prepared example script for serving sklearn irisFlowerClassifier can be found here: https://github.com/logicalclocks/hops-examples/tree/master/tensorflow/notebooks/Serving \n",
    "\n",
    "It is a best-practice to put the script together with the trained model, below is the code for exporting the script from `Jupyter/Serving/sklearn/iris_flower_classifier.py` to `Models/irisFlowerClassifier/1/iris_flower_classifier.py`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/'"
     ]
    }
   ],
   "source": [
    "script_path = \"Jupyter/Serving/sklearn/iris_flower_classifier.py\"\n",
    "serving.export(script_path, SERVING_NAME, SERVING_VERSION, overwrite=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using `serving.create_or_update()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1\n",
      "hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py\n",
      "hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/iris_knn.pkl"
     ]
    }
   ],
   "source": [
    "for p in hdfs.ls(\"Models/\" + SERVING_NAME, recursive=True):\n",
    "    print(p)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To update an existing serving, set the argument `update=True` in `serving.create_or_update()`. To delete an existing serving, call `serving.delete()`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "No serving with name IrisFlowerClassifier was found in the project demo_deep_learning_admin000\n",
      "Creating a serving for model IrisFlowerClassifier ...\n",
      "Serving for model IrisFlowerClassifier successfully created"
     ]
    }
   ],
   "source": [
    "script_path = \"Models/\" + SERVING_NAME + \"/\" + str(SERVING_VERSION) + \"/iris_flower_classifier.py\"\n",
    "if serving.exists(SERVING_NAME):\n",
    "    serving.delete(SERVING_NAME)\n",
    "serving.create_or_update(script_path, SERVING_NAME, serving_type=\"SKLEARN\", \n",
    "                                 model_version=SERVING_VERSION)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After the serving have been created, you can find it in the Hopsworks UI by going to the \"Model Serving\" tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like: \n",
    "\n",
    "- `get_servings()`\n",
    "- `get_serving_id(serving_name)`\n",
    "- `get_serving_artifact_path(serving_name)`\n",
    "- `get_serving_type(serving_name)`\n",
    "- `get_serving_version(serving_name)`\n",
    "- `get_serving_kafka_topic(serving_name)`\n",
    "- `get_serving_status(serving_name)`\n",
    "- `exist(serving_name)`\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "IrisFlowerClassifier\n",
      "mnist"
     ]
    }
   ],
   "source": [
    "for s in serving.get_all():\n",
    "    print(s.name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "8"
     ]
    }
   ],
   "source": [
    "serving.get_id(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'/Projects/demo_deep_learning_admin000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py'"
     ]
    }
   ],
   "source": [
    "serving.get_artifact_path(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'SKLEARN'"
     ]
    }
   ],
   "source": [
    "serving.get_type(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1"
     ]
    }
   ],
   "source": [
    "serving.get_version(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'IrisFlowerClassifier-inf9621'"
     ]
    }
   ],
   "source": [
    "serving.get_kafka_topic(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'Stopped'"
     ]
    }
   ],
   "source": [
    "serving.get_status(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting serving with name: IrisFlowerClassifier...\n",
      "Serving with name: IrisFlowerClassifier successfully started"
     ]
    }
   ],
   "source": [
    "serving.start(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [],
   "source": [
    "time.sleep(5) # Let the serving startup correctly"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Stopping serving with name: IrisFlowerClassifier...\n",
      "Serving with name: IrisFlowerClassifier successfully stopped"
     ]
    }
   ],
   "source": [
    "serving.stop(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [],
   "source": [
    "time.sleep(5) # Let the serving stop and cleanup correctly"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting serving with name: IrisFlowerClassifier...\n",
      "Serving with name: IrisFlowerClassifier successfully started"
     ]
    }
   ],
   "source": [
    "serving.start(SERVING_NAME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [],
   "source": [
    "time.sleep(10) # Let the serving startup correctly before sending inference requests"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Send Prediction Requests to the Served Model using Hopsworks REST API"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Constants"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [],
   "source": [
    "TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)\n",
    "NUM_FEATURES = 4"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For making inference requests you can use the utility method `serving.make_inference_request`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [1]}\n",
      "{'predictions': [1]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [1]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [1]}\n",
      "{'predictions': [2]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [2]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}"
     ]
    }
   ],
   "source": [
    "for i in range(20):\n",
    "    data = {\"inputs\" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}\n",
    "    response = serving.make_inference_request(SERVING_NAME, data)\n",
    "    print(response)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Monitor Prediction Requests and Responses using Kafka\n",
    "\n",
    "All prediction requestst are automatically logged to Kafka which means that you can keep track for yourr model's performance and its predictions in a scalable manner.\n",
    "\n",
    "**Note**: The code below (in particular the avro parsing) have only been tested on Python 2.7"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### Setup Kafka Consumer and Subscribe to the Topic containing the Inference Logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [],
   "source": [
    "config = kafka.get_kafka_default_config()\n",
    "config['default.topic.config'] = {'auto.offset.reset': 'earliest'}\n",
    "consumer = Consumer(config)\n",
    "topics = [TOPIC_NAME]\n",
    "consumer.subscribe(topics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### Read Kafka Avro Schema From Hopsworks and setup an Avro Reader"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [],
   "source": [
    "json_schema = kafka.get_schema(TOPIC_NAME)\n",
    "avro_schema = kafka.convert_json_schema_to_avro(json_schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### Read Lookup Table from the Feature Store for Converting Numerical Labels to Categorical"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_deep_learning_admin000_featurestore\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM iris_labels_lookup_1"
     ]
    }
   ],
   "source": [
    "iris_labels_lookup_df = featurestore.get_featuregroup(\"iris_labels_lookup\", dataframe_type=\"pandas\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### Read 10 Messages from the Kafka Topic, parse them with the Avro Schema and print the results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963528,\n",
      "request: {\"inputs\": [[4.441035498836311, 1.0405422685903551, 4.950509272511928, 7.201979298717179]]},\n",
      "prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963656,\n",
      "request: {\"inputs\": [[1.1031470267993444, 7.509936088602124, 7.228032694461817, 3.7487129040092997]]},\n",
      "prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963767,\n",
      "request: {\"inputs\": [[2.235706745836112, 3.093572695191493, 2.7323921162142804, 2.0689962421316657]]},\n",
      "prediction:1, prediction_label:Versicolor, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339963896,\n",
      "request: {\"inputs\": [[1.3205991163619348, 2.0595736232497166, 4.985849386074168, 1.4408294851114367]]},\n",
      "prediction:1, prediction_label:Versicolor, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339964020,\n",
      "request: {\"inputs\": [[4.220270978423765, 2.8304129673592966, 7.424421983383942, 5.361722367249203]]},\n",
      "prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339964124,\n",
      "request: {\"inputs\": [[2.6970999685623243, 6.347149730290876, 5.39836695361557, 5.2666212297525705]]},\n",
      "prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339964238,\n",
      "request: {\"inputs\": [[3.117586271070294, 2.2683075659770253, 2.7101690021986586, 6.041976280921667]]},\n",
      "prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339964350,\n",
      "request: {\"inputs\": [[7.320109368200581, 3.7870289879752477, 5.293915277978317, 5.76674104896385]]},\n",
      "prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339964452,\n",
      "request: {\"inputs\": [[4.314981146863798, 3.893368580779179, 6.288495981583821, 4.539703355969132]]},\n",
      "prediction:0, prediction_label:Virginica, http_response_code: 200, serving_type: SKLEARN\n",
      "\n",
      "serving: IrisFlowerClassifier, version: 1, timestamp: 1560339964556,\n",
      "request: {\"inputs\": [[5.850494074522825, 5.127440379139064, 2.628009669953899, 2.9049280774589574]]},\n",
      "prediction:1, prediction_label:Versicolor, http_response_code: 200, serving_type: SKLEARN"
     ]
    }
   ],
   "source": [
    "for i in range(0, 10):\n",
    "    msg = consumer.poll(timeout=1.0)\n",
    "    if msg is not None:\n",
    "        value = msg.value()\n",
    "        try:\n",
    "            event_dict = kafka.parse_avro_msg(value, avro_schema)\n",
    "            prediction = json.loads(event_dict[\"inferenceResponse\"])[\"predictions\"][0]\n",
    "            prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, \n",
    "                                                         'variety'].iloc[0]\n",
    "            print(\"serving: {}, version: {}, timestamp: {},\"\\\n",
    "                  \"\\nrequest: {},\\nprediction:{}, prediction_label:{}, http_response_code: {},\"\\\n",
    "                  \" serving_type: {}\\n\".format(\n",
    "                                                                   event_dict[\"modelName\"],\n",
    "                                                                   event_dict[\"modelVersion\"],\n",
    "                                                                   event_dict[\"requestTimestamp\"],\n",
    "                                                                   event_dict[\"inferenceRequest\"],\n",
    "                                                                   prediction,\n",
    "                                                                   prediction_label,\n",
    "                                                                   event_dict[\"responseHttpCode\"],\n",
    "                                                                   event_dict[\"servingType\"]\n",
    "            ))\n",
    "        except Exception as e:\n",
    "            print(\"A message was read but there was an error parsing it\")\n",
    "            print(e)\n",
    "    else:\n",
    "        print(\"timeout.. no more messages to read from topic\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}